package com.zhang.first.day06;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @title: sink to kafka
 * @author: zhang
 * @date: 2022/1/20 15:19
 */
public class Example7 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //kafka相关配置
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        env
                .readTextFile("/Users/apple/IdeaProjects/flink_1.13/src/main/resources/UserBehavior.csv")
                        .addSink(new FlinkKafkaProducer<String>(
                                "flink_to_kafka",
                                new SimpleStringSchema(),
                                properties
                        ));
        env.execute();
    }
}
